package com.hzzftech.watchdog.busi.core.executor;

import com.alibaba.fastjson.JSON;
import com.hzzftech.watchdog.busi.config.KettleConfig;
import com.hzzftech.watchdog.busi.constants.BusiConstant;
import com.hzzftech.watchdog.busi.core.dance.JobChangeState;
import com.hzzftech.watchdog.busi.core.executor.starter.KettleEnvironmentStarter;
import com.hzzftech.watchdog.busi.core.executor.utils.JobExecuteUtil;
import com.hzzftech.watchdog.busi.core.util.KettleImgUtil;
import com.hzzftech.watchdog.busi.domain.KtDispatcher;
import com.hzzftech.watchdog.busi.domain.KtDispatcherFailedMsg;
import com.hzzftech.watchdog.busi.domain.KtExecutionLog;
import com.hzzftech.watchdog.busi.domain.KtJob;
import com.hzzftech.watchdog.busi.service.IKtDispatcherFailedMsgService;
import com.hzzftech.watchdog.busi.service.IKtDispatcherService;
import com.hzzftech.watchdog.busi.service.IKtExecutionLogService;
import com.hzzftech.watchdog.busi.service.IKtJobService;
import com.hzzftech.watchdog.busi.service.impl.KtExecutionLogServiceImpl;
import com.hzzftech.watchdog.busi.utils.TaskManageUtil;
import com.hzzftech.watchdog.common.core.domain.entity.SysUser;
import com.hzzftech.watchdog.common.json.JSONObject;
import com.hzzftech.watchdog.common.utils.StringUtils;
import com.hzzftech.watchdog.common.utils.spring.SpringUtils;
import com.hzzftech.watchdog.common.utils.uuid.IdUtils;
import com.hzzftech.watchdog.system.service.ISysUserService;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.LongObjectId;
import org.pentaho.di.repository.RepositoriesMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Date;
import java.util.concurrent.Callable;

/**
 * JOB任务执行器
 */
public class JobExecutor implements Callable<String> {
    public static Logger logger = LoggerFactory.getLogger(JobExecutor.class);

    private JobExecutionConfiguration configuration;
    private JobMeta meta;
    boolean isClickStop = false;
    private String logId;
    private KtExecutionLog log;
    private String key;
    private String dpRepoType;
    private Long jobId;

    /**
     * @param configuration 执行配置
     * @param meta job元数据
     * @param dpRepoType 仓库类型 0 git资源库 1 文件资源库 2 数据库资源库
     */
    public JobExecutor(JobExecutionConfiguration configuration, JobMeta meta, String dpRepoType, Long jobId) {
        this.configuration = configuration;
        this.meta = meta;
        this.dpRepoType = dpRepoType;
        this.jobId = jobId;
    }

    public JobExecutor(JobExecutionConfiguration configuration, JobMeta meta,String dpRepoType, String logId, Long jobId) {
        this.configuration = configuration;
        this.meta = meta;
        this.logId = logId;
        this.dpRepoType = dpRepoType;
        this.jobId = jobId;
    }

    @Override
    public String call() throws Exception {
        IKtExecutionLogService logService = SpringUtils.getBean(KtExecutionLogServiceImpl.class);
        IKtJobService jobService = SpringUtils.getBean(IKtJobService.class);
        KettleConfig kettleConfig = SpringUtils.getBean(KettleConfig.class);
        boolean insertFlag = false;
        if (StringUtils.isEmpty(logId)) {
            logId = IdUtils.fastUUID();
            logger.info("Job log id: "+logId);
            log = new KtExecutionLog();
            log.setDpRepoType(dpRepoType);
            log.setId(logId);
            insertFlag = true;
        } else {
            log = logService.selectKtExecutionLogById(logId);
        }

        key = BusiConstant.KEY_PREFFIX+logId;
        try {
            TaskManageUtil.getInstance().add(key, this);
            int status = DispatcherStatus.STATUS_RUNNING.getStatus();
            // 执行日志的更新或新增
            log.setStartTime(new Date());
            log.setStatus(String.valueOf(status));
            log.setTaskName(meta.getName());
            log.setTaskType(BusiConstant.KETTLE_TYPE_FLAG_JOB);
            log.setTaskId(Long.valueOf(meta.getObjectId().getId()));
            log.setExecutionMethod("远程："+configuration.getRemoteServer().getHostname());
            log.setSlaveId(Long.parseLong(configuration.getRemoteServer().getObjectId().getId()));
            log.setExecutionConfiguration(JSON.toJSONString(configuration.getVariables()));
            if (jobId != null) {
                KtJob ktJob = jobService.selectJobById(jobId);
                if (ktJob != null) {
                    log.setJobId(jobId);
                    log.setJobName(ktJob.getJobName());
                }
            }

            String transImg = KettleImgUtil.getJobImg(meta, kettleConfig.getGIT_FILE_REPOSITORY_LOG_FILE()+ File.separator +"job");
            log.setImagePath(transImg);

            // 发送远程执行
            String carteId = "";

            if (BusiConstant.REPOSITORY_TYPE_DB.equals(dpRepoType)) {

                KettleDatabaseRepository kettleDatabaseRepository = (KettleDatabaseRepository) KettleEnvironmentStarter.instance.getRepository();

                KettleDatabaseRepositoryMeta repositoryMeta = kettleDatabaseRepository.getRepositoryMeta();
                RepositoriesMeta repositoriesMeta = new RepositoriesMeta();
                repositoriesMeta.addDatabase(kettleDatabaseRepository.getDatabaseMeta());
                repositoriesMeta.addRepository(repositoryMeta);
                Repository repository = this.configuration.connectRepository(repositoriesMeta, kettleDatabaseRepository.getName(), kettleConfig.getRepoUserName(), kettleConfig.getRepoPassword());
                carteId = Job.sendToSlaveServer(meta, this.configuration, repository, null);
            } else {
                // 发送远程执行
                carteId = JobExecuteUtil.sendToSlaveServer(meta, configuration, null);
            }

            logger.info("Job carte obj id: "+carteId);
            log.setCarteId(carteId);
            logger.info("[JobExecutor] 发送任务至carte服务,任务名称[{}]", meta.getName());
        } catch (Exception e) {
            logger.error("执行任务失败",e);
            log.setStatus(String.valueOf(DispatcherStatus.STATUS_FAIL.getStatus()));
            if (e instanceof KettleException) {
                log.setRemark("发送远程服务失败，调度ID："+meta.getObjectId().getId()+"，错误信息："+((KettleException)e).getSuperMessage());
            } else{
                log.setRemark("发送远程服务失败, 调度ID："+meta.getObjectId().getId());
            }
            log.setEndTime(new Date());
            IKtDispatcherService dispatcherService = SpringUtils.getBean(IKtDispatcherService.class);
            KtDispatcher dispatcher = dispatcherService.selectKtDispatcherById(Long.valueOf(meta.getObjectId().getId()));

            if (StringUtils.isNotEmpty(dispatcher.getWarnMsgIds() )) {
                IKtDispatcherFailedMsgService failedMsgService = SpringUtils.getBean(IKtDispatcherFailedMsgService.class);
                ISysUserService userService = SpringUtils.getBean(ISysUserService.class);
                for (String id : dispatcher.getWarnMsgIds().split(",")) {
                    KtDispatcherFailedMsg msg = new KtDispatcherFailedMsg();
                    msg.setDpId(Long.valueOf(meta.getObjectId().getId()));
                    msg.setFailedTime(new Date());
                    msg.setSendTime(0L);
                    msg.setStatus(BusiConstant.STATUS_YES);
                    msg.setDpName(dispatcher.getDpName());
                    msg.setUserId(Long.parseLong(id));
                    SysUser user = userService.selectUserById(Long.parseLong(id));
                    msg.setUserName(user.getUserName());
                    msg.setEmail(user.getEmail());
                    msg.setPhone(user.getPhonenumber());
                    failedMsgService.insertKtDispatcherFailedMsg(msg);
                }
            }
        }   finally {
            if (insertFlag) {
                logService.insertKtExecutionLog(log);
            } else {
                logService.updateKtExecutionLog(log);
            }
            JobChangeState.instance.jobStateChange();
            TaskManageUtil.getInstance().remove(key);
        }
        return "Job End";
    }

    public void stop() {
//        if (null!= job) {
//            job.stopAll();
//        }
    }

    public void setClickStop(boolean clickStop) {
        this.isClickStop = clickStop;
    }
}
